package com.proxy.server.frontend.handler.execute;

import com.proxy.common.constant.ErrorCodeType;
import com.proxy.common.constant.IsolationType;
import com.proxy.common.constant.PacketType;
import com.proxy.common.exception.ProxyIoException;
import com.proxy.common.packet.*;
import com.proxy.common.utils.CharsetUtil;
import com.proxy.server.backend.Trigger;
import com.proxy.server.backend.TriggerBase;
import com.proxy.server.backend.bio.BioBackendConnection;
import com.proxy.server.backend.bio.SyncReceiver;
import com.proxy.server.frontend.FrontendConnection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by liufish on 16/7/24.
 */
public class BioSingleExecute {

    private static final Logger logger = LogManager.getLogger(BioSingleExecute.class);
    /**
     * <pre>
     *     经路由分析后得到前端和后端的链接以及转发命令commandPacket
     *     在单线程过程中响应及其等待结果。
     * </pre>
     * @param frontendConnection
     * @param backendConnection
     * @param commandPacket
     */
    public void execute(final FrontendConnection frontendConnection, BioBackendConnection backendConnection, BinaryPacket frontendBin, CommandPacket commandPacket){
        try {

            this._checkCharset(frontendConnection,backendConnection);
            this._checkTxIsolation(frontendConnection,backendConnection);
            this._checkAutocommit(frontendConnection,backendConnection);

            commandPacket.write(backendConnection.getOutputStream());

            Trigger trigger = new Trigger() {

                private int packetId = 0;

                @Override
                public void onSuccess(BinaryPacket bin) {
                    bin.packetId = ++ packetId;
                    frontendConnection.write(bin);
                }

                @Override
                public void onError(BinaryPacket bin) {
                    bin.packetId = ++ packetId;
                    frontendConnection.write(bin);
                }

                @Override
                public void onHeadFieldsEof(BinaryPacket head, List<BinaryPacket> fields, BinaryPacket eof) {
                    head.packetId = ++ packetId;
                    frontendConnection.write(head);
                    for (BinaryPacket bin : fields){
                        bin.packetId = ++ packetId;
                        frontendConnection.write(bin);
                    }
                    eof.packetId = ++ packetId;
                    frontendConnection.write(eof);
                }

                @Override
                public void onRow(BinaryPacket row) {
                    row.packetId = ++ packetId;
                    frontendConnection.write(row);
                }

                @Override
                public void onRowsEof(BinaryPacket bin) {
                    bin.packetId = ++ packetId;
                    frontendConnection.write(bin);
                }
            };

            SyncReceiver receiver = new SyncReceiver();
            receiver.setTrigger(trigger);
            receiver.receive(backendConnection);

        }catch (Exception ex){
            ErrorPacket err = new ErrorPacket();
            err.packetId = 0;
            err.errNo = ErrorCodeType.ER_YES;
            try {
                err.message = ex.getMessage().getBytes(frontendConnection.getCharset());
                frontendConnection.write(err);
            } catch (UnsupportedEncodingException e) {
                throw new ProxyIoException(e);
            }

        }

    }


    private void _checkCharset(final FrontendConnection frontendConnection, final BioBackendConnection backendConnection)throws IOException{

        if(frontendConnection.getCharsetIndex() != backendConnection.getCharsetIndex()){

            final String charset = CharsetUtil.getDbCharset(frontendConnection.getCharsetIndex());
            StringBuilder builder = new StringBuilder();
            builder.append("SET NAMES ").append(charset);
            CommandPacket cmd = new CommandPacket();
            cmd.packetId = 0;
            cmd.packetType = PacketType.COM_QUERY;
            cmd.data = builder.toString().getBytes();

            cmd.write(backendConnection.getOutputStream());

            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {
                    backendConnection.setCharsetIndex(frontendConnection.getCharsetIndex());
                    backendConnection.setCharset(frontendConnection.getCharset());
                    backendConnection.setDbCharset(frontendConnection.getDbCharset());
                }

                @Override
                public void onError(BinaryPacket bin) {
                    ErrorPacket err = new ErrorPacket();
                    err.read(bin);
                    throw new ProxyIoException("sync charset error");
                }
            };

            SyncReceiver receiver = new SyncReceiver();
            receiver.setTrigger(trigger);
            receiver.receive(backendConnection);
        }
    }



    private void _checkAutocommit(final FrontendConnection frontendConnection,final BioBackendConnection backendConnection) throws IOException{

        if(frontendConnection.isAutoCommit() != backendConnection.isAutoCommit()){

            CommandPacket autoCommitPacket = new CommandPacket();
            autoCommitPacket.packetId = 0;
            autoCommitPacket.packetType = PacketType.COM_QUERY;
            byte [] data = null;
            if(frontendConnection.isAutoCommit()){
                data = "SET autocommit=1".getBytes();
            }else {
                data = "SET autocommit=0".getBytes();
            }
            autoCommitPacket.data = data;
            autoCommitPacket.write(backendConnection.getOutputStream());


            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {
                    backendConnection.setAutoCommit(frontendConnection.isAutoCommit());
                }

                @Override
                public void onError(BinaryPacket bin) {
                    throw new ProxyIoException("sync autocommit error");
                }
            };

            SyncReceiver receiver = new SyncReceiver();
            receiver.setTrigger(trigger);
            receiver.receive(backendConnection);
        }
    }


    private void _checkTxIsolation(final FrontendConnection frontendConnection,final BioBackendConnection backendConnection)throws IOException{

        if(frontendConnection.getTxIsolation() != backendConnection.getTxIsolation()){

            CommandPacket txPacket = new CommandPacket();
            txPacket.packetId = 0;
            txPacket.packetType = PacketType.COM_QUERY;
            byte [] data = null;
            switch (frontendConnection.getTxIsolation()){
                case IsolationType.READ_UNCOMMITTED:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED".getBytes();
                    break;
                case IsolationType.READ_COMMITTED:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED".getBytes();
                    break;
                case IsolationType.REPEATED_READ:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ".getBytes();
                    break;
                case IsolationType.SERIALIZABLE:
                    data = "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE".getBytes();
                    break;

                default:
                    throw new ProxyIoException("txIsolation:" + frontendConnection.getTxIsolation());

            }
            txPacket.data = data;
            txPacket.write(backendConnection.getOutputStream());

            Trigger trigger = new TriggerBase() {
                @Override
                public void onSuccess(BinaryPacket bin) {
                    backendConnection.setTxIsolation(frontendConnection.getTxIsolation());
                }

                @Override
                public void onError(BinaryPacket bin) {
                    throw new ProxyIoException("sync txIsolation error");
                }
            };

            SyncReceiver receiver = new SyncReceiver();
            receiver.setTrigger(trigger);
            receiver.receive(backendConnection);
        }

    }

}
